Многопоточное программирование в Java - Тимур Машнин
Класс ForkJoinPool является основой фреймворка и является реализацией интерфейса ExecutorService, управляя рабочими потоками и обеспечивая получение информации о состоянии пула потоков и производительности.
Рабочий поток может выполнять только одну задачу одномоментно, и ForkJoinPool не создает отдельный поток для каждой подзадачи.
Вместо этого каждый поток в пуле имеет свою собственную двойную очередь (или deque), в которой хранятся задачи.
Эта архитектура обеспечивает балансировку рабочей нагрузки потока с помощью алгоритма воровства работы.
Проще говоря — свободные потоки пытаются «украсть» работу из очередей занятых потоков.
По умолчанию рабочий поток получает задачи из головы собственной очереди.
Когда эта очередь пустая, поток берет задачу из хвоста очереди другого занятого потока или из глобальной очереди на входе.
Этот подход минимизирует вероятность того, что потоки будут конкурировать за задачи.
Это также уменьшает количество раз, когда поток должен искать работу, так как он будет работать в первую очередь с самыми большими доступными кусками работы.
ForkJoinTask — это базовый тип задач, выполняемых внутри ForkJoinPool.
На практике должен быть расширен один из двух его подклассов: RecursiveAction для задач void и RecursiveTask для задач, возвращающих значение.
Оба эти класса имеют абстрактный метод compute, в котором должна быть определена логика задачи.
Общий шаблон метода compute следующий — если размер задачи меньше порогового значения, задача решается без параллелизма.
Если больше, тогда задача разбивается на подзадачи, как минимум две.
Затем к подзадачам применяется метод fork класса ForkJoinTask, который помещает задачу в рабочую очередь, где, либо текущий поток вызовет метод compute задачи, либо задачу украдет другой поток, который вызовет метод compute задачи.
После вызова метода compute задачи, она опять разделится и все повторится до тех пор, пока размер задачи не станет маленьким.
После вызова метода fork, который возвращает сразу и является асинхронным, к подзадаче применяется метод join класса ForkJoinTask, который возвращает результат вычисления, когда он готов, то есть этот метод является блокирующим.
После возврата результатов, они объединяются.
В этом примере выполняется суммирование всех элементов массива с использованием параллелизма для параллельной обработки различных сегментов из 5000 элементов.
Здесь, если размер массива меньше 5000 элементов, суммирование производится без параллелизма в цикле.
Если размер больше порогового значения, задача разделяется пополам.
К одной части рекурсивно применяется метод fork, а к другой рекурсивно применяется метод compute.
Затем к той части, к которой применялся метод fork, применяется метод join, ожидающий результата.
И в конце концов результаты складываются вместе.
Сам по себе вызов метода fork не создает никакого потока и не вызывает выполнение задачи.
Для этого нужно создать пул потоков ForkJoinPool и послать в его метод задачу ForkJoinTask.
Что мы и делаем в методе sumArray.
Создать пул потоков ForkJoinPool можно методом commonPool, или можно конструктором, указав уровень параллелизма или количество ядер процессора для выполнения вычислений.
Если в конструкторе ничего не указывать, уровень параллелизма будет равен количеству доступных ядер процессора.
Объект, созданный с помощью конструктора, должен быть статическим, чтобы избежать создание своего пула для каждой задачи.
Метод commonPool делает это автоматически.
Класс ForkJoinPool имеет четыре метода, принимающих задачу ForkJoinTask.
Асинхронные методы submit или execute помещают задачу в пул потоков.
Для получения результата нужно затем вызвать метод join.
Метод invoke помещает задачу в пул и ожидает результата.
Метод invokeAll позволяет отправить коллекцию задач в пул потоков.
При применении фреймворка fork/join возникает вопрос — как определить пороговое значение метода compute.
Порог можно вычислить как отношение размера задачи к количеству ядер, умноженных на желаемое количество задач на один поток.
load factor обычно берут 8 или 16.
Хорошая практика, когда порог предполагает количество базовых шагов вычисления больше 100 и меньше 10000.
Теперь рассмотрим код, где мы рекурсивно решаем подзадачи.
Может показаться более естественным вызывать fork дважды для двух подзадач, а затем дважды вызвать join.
Или вызвать ожидающий результат метод invokeAll, а затем дважды вызвать join.
Или вызвать метод invoke, который объединяет fork и join в одном вызове.
Однако это будет менее эффективно, чем просто вызывать compute, так как вы увеличиваете накладные расходы на создание дополнительных параллельных задач, чем это нужно.
Кроме того, здесь важен порядок вызова методов.
Сначала вызвать fork, который сразу вернет.
Затем вызвать compute и получить результат.
Затем вызвать join и получить результат.
Однако при создании задачи RecursiveAction имеет смысл использовать метод invokeAll, так как нам здесь не нужен результат, а нужно просто выполнить задачи параллельно.
Самостоятельное задание
Создайте задачу RecursiveTask и выведите статистику — время выполнения, количество потоков, количество задач потоков, количество ожидающих задач, количество потоков, которые в настоящее время воруют или выполняют задачи, количество запущенных потоков, количество задач, украденных из рабочей очереди одного потока другим.
Создайте задачу RecursiveAction и сравните эффективность метода invokeAll и fork + compute + join.
Надо сказать, что Java 8 добавляет в интерфейс ExecutorService метод newWorkStealingPool, который фактически возвращает пул потоков ForkJoinPool.
То есть этот метод не создает никакого нового пула потоков, а отправляет нас к фреймворку fork/join.
Помимо классов RecursiveAction и RecursiveTask, Java 8 вводит класс CountedCompleter, который реализует класс ForkJoinTask.
Класс CountedCompleter обеспечивает механизм для выполнения метода после завершения всех подзадач.
Это метод onCompletion класса CountedCompleter.
Для вызова метода onCompletion, в той части метода compute, где идет вычисление подзадачи, достигшей минимального размера, мы вызываем метод tryComplete, который вызывает метод onCompletion.
Когда задача завершает метод onCompletion, вызывается метод tryComplete родительской подзадачи, и так далее до источника.
Если необходимо вернуть результат, переопределяется метод getRawResult класса CountedCompleter.
Каждый раз, когда объявляется новая подзадача, следует вызывать метод addToPendingCount.
Благодаря этому изменяется внутренний счетчик ожидающих задач.
Этот счетчик определяет, были ли выполнены задачи.
Если это не так, тогда счетчик уменьшается.
Когда задачи завершаются и счетчик обнуляется и вызывается метод tryComplete, событие завершения отправляется во все задачи CountedCompleter.
Если метод tryComplete не будет